37972f7d5f6ba11ee3817067a34382d0d9f8ff3d,src/main/java/com/continuuity/gateway/accessor/DatasetRestHandler.java,DatasetRestHandler,handleTableOperation,#MessageEvent#HttpRequest#MetricsHelper#LinkedList#Map#OperationContext#,249
Before Change
// optional parameter encoding
String encoding = getEncodingParameter(message, request, helper, parameters);
if (encoding == null) { // error
return;
}
// make sure the operations and parameters are valid
HttpMethod method = request.getMethod();
if (HttpMethod.GET.equals(method)) {
// can be either a list or a read, list would have had ?op=list
if (operation == TableOp.List) {
if (row != null) {
respondBadRequest(message, request, helper, "list operation cannot have row key");
return;
} else {
respondBadRequest(message, request, helper,
"list operation not implemented", HttpResponseStatus.NOT_IMPLEMENTED);
return;
}
}
// make sure no other operation was given with ?op=
if (operation != null) {
respondBadRequest(message, request, helper, "invalid operation for method GET");
return;
}
// must be a read, requires a row
if (row == null) {
respondBadRequest(message, request, helper, "read must have a row key");
return;
}
if (columns != null && !columns.isEmpty() && (start != null || stop != null)) {
respondBadRequest(message, request, helper, "read can only specify columns or range");
return;
}
operation = TableOp.Read;
} else if (HttpMethod.DELETE.equals(method)) {
// make sure no operation was given with ?op=
if (operation != null) {
respondBadRequest(message, request, helper, "invalid operation for method GET");
return;
}
// must be a delete, requires a row
if (row == null) {
respondBadRequest(message, request, helper, "delete must have a row key");
return;
}
if (columns == null || columns.isEmpty()) {
respondBadRequest(message, request, helper, "delete must have columns");
return;
}
operation = TableOp.Delete;
} else if (HttpMethod.PUT.equals(method)) {
// make sure no operation was given with ?op=
if (operation != null) {
respondBadRequest(message, request, helper, "invalid operation for method PUT");
return;
}
// must be a write, requires a row
if (row == null) {
operation = TableOp.Create;
} else {
operation = TableOp.Write;
}
} else if (HttpMethod.POST.equals(method)) {
// make sure no operation was given with ?op=
if (operation == null) {
respondBadRequest(message, request, helper, "missing operation for method POST");
return;
}
if (operation != TableOp.Increment) {
respondBadRequest(message, request, helper, "invalid operation for method POST");
return;
}
// must be increment, requires a row
if (row == null) {
respondBadRequest(message, request, helper, "increment must have a row key");
return;
}
operation = TableOp.Increment;
}
Type stringMapType = new TypeToken<Map<String, String>>() {}.getType();
// Type longMapType = new TypeToken<Map<String, Long>>() {}.getType();
// for operations write and increment, there must be a JSON string in the body
Map<String, String> valueMap = null;
// Map<String, Long> longMap = null;
try {
if (operation == TableOp.Increment || operation == TableOp.Write) {
InputStreamReader reader = new InputStreamReader(
new ChannelBufferInputStream(request.getContent()), Charsets.UTF_8);
if (operation == TableOp.Write) {
valueMap = new Gson().fromJson(reader, stringMapType);
} else {
// does not seem to work, Gson returns Map<String,String>
// longMap = new Gson().fromJson(reader, longMapType);
valueMap = new Gson().fromJson(reader, stringMapType);
}
}
} catch (Exception e) {
// failed to parse json, that is a bad request
respondBadRequest(message, request, helper, "failed to read body as json: " + e.getMessage());
return;
}
if (operation.equals(TableOp.Create)) {
DataSetSpecification spec = new Table(tableName).configure();
Dataset ds = new Dataset(spec.getName());
ds.setName(spec.getName());
ds.setType(spec.getType());
ds.setSpecification(new Gson().toJson(spec));
try {
this.accessor.getMetadataService().assertDataset(new Account(opContext.getAccount()), ds);
} catch (MetadataServiceException e) {
respondBadRequest(message, request, helper, "table already exists", HttpResponseStatus.CONFLICT);
return;
} catch (TException e) {
helper.finish(Error);
LOG.error("Thrift error creating table: " + e.getMessage(), e);
respondError(message.getChannel(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}
respondSuccess(message.getChannel(), request);
helper.finish(Success);
return;
}
// make sure the table exists and instantiate dataset
Table table;
try {
table = this.accessor.getInstantiator().getDataSet(tableName, opContext);
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cannot instantiate requested table '" + tableName + "' (" +
e.getMessage() + ") for URI '" + request.getUri() + "'");
}
helper.finish(BadRequest);
respondError(message.getChannel(), HttpResponseStatus.NOT_FOUND);
return;
}
// try to convert the row ket to bytes, using the given encoding
byte[] rowKey;
try {
rowKey = row == null ? null : Util.decodeBytes(row, encoding);
} catch (Exception e) {
respondBadRequest(message, request, helper, "error decoding row key", e);
return;
}
if (operation.equals(TableOp.List)) {
// TODO not implemented
} else if (operation.equals(TableOp.Read)) {
Read read;
try {
if (columns == null || columns.isEmpty()) {
// column range
byte[] startCol = Util.decodeBytes(start, encoding);
byte[] stopCol = Util.decodeBytes(stop, encoding);
read = new Read(rowKey, startCol, stopCol, limit == null ? -1 : limit);
} else {
byte[][] cols = new byte[columns.size()][];
After Change
if (columns == null || columns.isEmpty()) {
// column range
byte[] startCol = start == null ? null : Util.decodeBinary(start, encoding);
byte[] stopCol = stop == null ? null : Util.decodeBinary(stop, encoding);
read = new Read(rowKey, startCol, stopCol, limit == null ? -1 : limit);
} else {
byte[][] cols = new byte[columns.size()][];